package com.markhsiu.minimq.broker;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.markhsiu.minimq.channel.ChannelProcessor;
import com.markhsiu.minimq.message.Message;
import com.markhsiu.minimq.remote.Address;
import com.markhsiu.minimq.zookeeper.ZkUtils;

/**
 * Created by Mark Hsiu on 2017/2/9.
 */
public class BrokerFactory {

	private static Logger logger = LoggerFactory.getLogger(BrokerFactory.class);

	private Broker broker;

	/**
	 * 注册的消费者信息 Map String 消费者ID List topic订阅主题
	 */
	private final Map<String, Map<String, String>> consumers = new ConcurrentHashMap<>();

	/**
	 * 消费者连接信息
	 */
	private final Map<String, ChannelProcessor<Message>> consumerConnections = new ConcurrentHashMap<>();

	/**
	 * 单例模式
	 */
	private final static BrokerFactory factory = new BrokerFactory();

	private BrokerFactory() {
	}

	public static BrokerFactory instance() {
		return factory;
	}

	// --------------- 消费者订阅管理
	public synchronized boolean removeConsumer(String consumerID) {
		if (consumers.containsKey(consumerID)) {
			consumers.remove(consumerID);
			return true;
		}

		logger.info("消费者移除失败：{}", consumerID);
		return false;
	}

	public Map<String, Map<String, String>> getAllConsumers() {
		return consumers;
	}

	public synchronized boolean addConsumer(String consumerID) {

		if (consumers.containsKey(consumerID)) {
			logger.info("同名消费者已存在：{}", consumerID);
		}

		consumers.put(consumerID, new ConcurrentHashMap<>());
		return true;
	}

	public synchronized boolean addConsumerSubscribeTopic(String consumerID, String topic) {
		Map<String, String> topics = consumers.get(consumerID);
		if (topics == null) {
			boolean flag = addConsumer(consumerID);
			if (!flag) {
				return false;
			}
			topics = consumers.get(consumerID);
		}
		topics.put(topic, consumerID);
		return true;
	}

	public synchronized boolean removeConsumerSubscribeTopic(String consumerID, String topic) {
		Map<String, String> topics = consumers.get(consumerID);
		if (topics == null) {
			logger.info("消费者:{} 不存在或已经清楚", consumerID);
			return false;
		}
		if (topics.remove(topic) != null) {
			return true;
		}

		logger.info("消费者：{} 取消订阅的主题：{} 不存在", consumerID, topic);
		return false;
	}

	// --------------- 消费者连接管理
	public void addConsumerConnection(String consumerID, ChannelProcessor<Message> channel) {
		consumerConnections.put(consumerID, channel);
	}

	public void removeConsumerConnection(String consumerID) {
		consumerConnections.remove(consumerID);
	}

	public ChannelProcessor<Message> getConsumerConnection(String consumerID) {
		return consumerConnections.get(consumerID);
	}

	// --------------- broker管理
	BrokerFactory setBroker(Broker broker) {
		this.broker = broker;
		return this;
	}

	public String getBrokerID() {
		return broker.getID();
	}

	public Address getAddr() {
		return broker.getAddr();
	}
	
	/**
	 * 注册到zookeeper
	 */
	public void  registerZk() {
		ZkUtils.registerBroker(broker.getID(), broker.getAddr().toString());
	} 
	
}
